package com.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.common.GlobalConfig;
import com.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @Description: TODO QQ1667847363
 * @author: xiao kun tai
 * @date:2022/1/5 13:08
 */

//TODO: 数据流 web/app -> Nginx -> Springboot  -> Kafka(ods)  ->  FlinkApp  -> Kafka(dwd) -> FlinkApp -> Kafka(DWM)
//TODO: 程序 mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK) -> UserJumpDetailApp -> Kafka(ZK)
public class UserJumpDetailApp {
    public static void main(String[] args) throws Exception {
        //TODO:1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //生产环境，与Kafka分区数保持一致

        //TODO:Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
        /*//开启CK并指定状态后端为FS menory fs rocksdb
        env.setStateBackend(new FsStateBackend("hdfs://192.168.88.109:9820/gmall-flink/ck"));
        //开启 Checkpoint,每隔 5 秒钟做一次 CK
        env.enableCheckpointing(5000L);
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000L));
        //设置访问 HDFS 的用户名
        System.setProperty("HADOOP_USER_NAME", "root");*/

        //TODO:2.读取Kafka主题的数据创建流
        String groupId = "user_jump_detail_app" + GlobalConfig.NUMBER;
        String sourceTopic = "dwd_page_log";
        String sinkTopic = "dwm_user_jump_detail";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));


        //TODO:3.将每行数据转换为JSON对象并提取时间戳生成Watermark
        SingleOutputStreamOperator<JSONObject> jsonObj = kafkaDS.map(JSON::parseObject)
                //匹配模式，收到的数据可能是乱序的的，所以需要使用无序流处理设置WaterMark
                .assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject jsonObject, long l) {
                                return jsonObject.getLong("ts");
                            }
                        }));


        //TODO:4.定义模式序列
        /*Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject jsonObject) throws Exception {
                String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");

                return lastPageId == null || lastPageId.length() <= 0;
            }
        }).next("next").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject jsonObject) throws Exception {
                String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");

                return lastPageId == null || lastPageId.length() <= 0;
            }
        }).within(Time.seconds(10));*/

        //使用循环模式  定义模式序列
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start")
                .where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObject) throws Exception {
                        String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");

                        return lastPageId == null || lastPageId.length() <= 0;
                    }
                })
                .times(2)
                .consecutive() //指定严格近邻(next)
                .within(Time.seconds(10));

        //TODO:5.将模式序列作用到流上
        PatternStream<JSONObject> patternStram = CEP
                .pattern(jsonObj.keyBy(json -> json.getJSONObject("common").getString("mid"))
                        , pattern);

        //TODO:6.提取匹配上的和超时事件
        OutputTag<JSONObject> timeOutTag = new OutputTag<JSONObject>("time-out") {
        };

        SingleOutputStreamOperator<JSONObject> selectDS = patternStram
                .select(timeOutTag, new PatternTimeoutFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject timeout(Map<String, List<JSONObject>> map, long ts) throws Exception {
                        return map.get("start").get(0);
                    }
                }, new PatternSelectFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject select(Map<String, List<JSONObject>> map) throws Exception {
                        return map.get("start").get(0);
                    }
                });
        DataStream<JSONObject> timeOutDS = selectDS.getSideOutput(timeOutTag);


        //TODO:7.UNION两种事件
        DataStream<JSONObject> unionDS = selectDS.union(timeOutDS);

        //TODO:8.将数据写入Kafka
        System.out.println("任务开始....................");
        unionDS.print(">>>>>>>>>>>>>");
        unionDS.map(json -> json.toJSONString())
                .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        //TODO:9.启动任务
        env.execute("UserJumpDetailApp");
    }
}
